//package com.flink.java.demo.sink;
//
//import com.flink.java.demo.bean.WaterSensor;
//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.functions.sink.SinkFunction;
//
//import java.sql.PreparedStatement;
//import java.sql.SQLException;
//
///**
// * @author cjp
// * @version 1.0
// */
//public class SinkDemo4_MySQL {
//    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//
//        SingleOutputStreamOperator<WaterSensor> sensorDS = env
//                .socketTextStream("hadoop102", 7777)
//                .map(value -> {
//                    String[] datas = value.split(",");
//                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
//                });
//
//
//        /**
//         *  写入mysql
//         * 1、只能用老的sink写法： addsink
//         * 2、JDBCSink的4个参数:
//         *    第一个参数： 执行的sql，一般就是 insert into
//         *    第二个参数： 预编译sql， 对占位符填充值
//         *    第三个参数： 执行选项 ---》 攒批、重试
//         *    第四个参数： 连接选项 ---》 url、用户名、密码
//         */
//        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
//                "insert into ws values(?,?,?)",
//                new JdbcStatementBuilder<WaterSensor>() {
//                    @Override
//                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
//                        //每收到一条WaterSensor，如何去填充占位符
//                        preparedStatement.setString(1, waterSensor.getId());
//                        preparedStatement.setLong(2, waterSensor.getTs());
//                        preparedStatement.setInt(3, waterSensor.getVc());
//                    }
//                },
//                JdbcExecutionOptions.builder()
//                        .withMaxRetries(3) // 重试次数
//                        .withBatchSize(100) // 批次的大小：条数
//                        .withBatchIntervalMs(3000) // 批次的时间  【条数或者时间，达到其中会一个就会写入数据库】
//                        .build(),
//                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
//                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
//                        .withUsername("root")
//                        .withPassword("000000")
//                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
//                        .build()
//        );
//        sensorDS.addSink(jdbcSink);
//        env.execute();
//    }
//}
